use std::ops::{ControlFlow, Deref};

use polars_core::frame::row::Row;
use polars_core::prelude::*;
use polars_lazy::prelude::*;
use polars_ops::frame::JoinCoalesce;
use polars_plan::dsl::function_expr::StructFunction;
use polars_plan::prelude::*;
use polars_utils::format_pl_smallstr;
use sqlparser::ast::{
    BinaryOperator, CreateTable, Delete, Distinct, ExcludeSelectItem, Expr as SQLExpr, FromTable,
    FunctionArg, GroupByExpr, Ident, JoinConstraint, JoinOperator, NamedWindowDefinition,
    NamedWindowExpr, ObjectName, ObjectType, Offset, OrderBy, Query, RenameSelectItem, Select,
    SelectItem, SetExpr, SetOperator, SetQuantifier, Statement, TableAlias, TableFactor,
    TableWithJoins, UnaryOperator, Value as SQLValue, Values, Visit, Visitor,
    WildcardAdditionalOptions, WindowSpec,
};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::{Parser, ParserOptions};

use crate::function_registry::{DefaultFunctionRegistry, FunctionRegistry};
use crate::sql_expr::{
    parse_sql_array, parse_sql_expr, resolve_compound_identifier, to_sql_interface_err,
};
use crate::table_functions::PolarsTableFunctions;
use crate::types::map_sql_dtype_to_polars;

#[derive(Clone)]
pub struct TableInfo {
    pub(crate) frame: LazyFrame,
    pub(crate) name: PlSmallStr,
    pub(crate) schema: Arc<Schema>,
}

struct SelectModifiers {
    exclude: PlHashSet<String>,                // SELECT * EXCLUDE
    ilike: Option<regex::Regex>,               // SELECT * ILIKE
    rename: PlHashMap<PlSmallStr, PlSmallStr>, // SELECT * RENAME
    replace: Vec<Expr>,                        // SELECT * REPLACE
}
impl SelectModifiers {
    fn matches_ilike(&self, s: &str) -> bool {
        match &self.ilike {
            Some(rx) => rx.is_match(s),
            None => true,
        }
    }
    fn renamed_cols(&self) -> Vec<Expr> {
        self.rename
            .iter()
            .map(|(before, after)| col(before.clone()).alias(after.clone()))
            .collect()
    }
}

/// The SQLContext is the main entry point for executing SQL queries.
#[derive(Clone)]
pub struct SQLContext {
    pub(crate) table_map: PlHashMap<String, LazyFrame>,
    pub(crate) function_registry: Arc<dyn FunctionRegistry>,
    pub(crate) lp_arena: Arena<IR>,
    pub(crate) expr_arena: Arena<AExpr>,

    cte_map: PlHashMap<String, LazyFrame>,
    table_aliases: PlHashMap<String, String>,
    joined_aliases: PlHashMap<String, PlHashMap<String, String>>,
    pub(crate) named_windows: PlHashMap<String, WindowSpec>,
}

impl Default for SQLContext {
    fn default() -> Self {
        Self {
            function_registry: Arc::new(DefaultFunctionRegistry {}),
            table_map: Default::default(),
            cte_map: Default::default(),
            table_aliases: Default::default(),
            joined_aliases: Default::default(),
            named_windows: Default::default(),
            lp_arena: Default::default(),
            expr_arena: Default::default(),
        }
    }
}

impl SQLContext {
    /// Create a new SQLContext.
    /// ```rust
    /// # use polars_sql::SQLContext;
    /// # fn main() {
    /// let ctx = SQLContext::new();
    /// # }
    /// ```
    pub fn new() -> Self {
        Self::default()
    }

    /// Get the names of all registered tables, in sorted order.
    pub fn get_tables(&self) -> Vec<String> {
        let mut tables = Vec::from_iter(self.table_map.keys().cloned());
        tables.sort_unstable();
        tables
    }

    /// Register a [`LazyFrame`] as a table in the SQLContext.
    /// ```rust
    /// # use polars_sql::SQLContext;
    /// # use polars_core::prelude::*;
    /// # use polars_lazy::prelude::*;
    /// # fn main() {
    ///
    /// let mut ctx = SQLContext::new();
    /// let df = df! {
    ///    "a" =>  [1, 2, 3],
    /// }.unwrap().lazy();
    ///
    /// ctx.register("df", df);
    /// # }
    ///```
    pub fn register(&mut self, name: &str, lf: LazyFrame) {
        self.table_map.insert(name.to_owned(), lf);
    }

    /// Unregister a [`LazyFrame`] table from the [`SQLContext`].
    pub fn unregister(&mut self, name: &str) {
        self.table_map.remove(&name.to_owned());
    }

    /// Execute a SQL query, returning a [`LazyFrame`].
    /// ```rust
    /// # use polars_sql::SQLContext;
    /// # use polars_core::prelude::*;
    /// # use polars_lazy::prelude::*;
    /// # fn main() {
    ///
    /// let mut ctx = SQLContext::new();
    /// let df = df! {
    ///    "a" =>  [1, 2, 3],
    /// }
    /// .unwrap();
    ///
    /// ctx.register("df", df.clone().lazy());
    /// let sql_df = ctx.execute("SELECT * FROM df").unwrap().collect().unwrap();
    /// assert!(sql_df.equals(&df));
    /// # }
    ///```
    pub fn execute(&mut self, query: &str) -> PolarsResult<LazyFrame> {
        let mut parser = Parser::new(&GenericDialect);
        parser = parser.with_options(ParserOptions {
            trailing_commas: true,
            ..Default::default()
        });

        let ast = parser
            .try_with_sql(query)
            .map_err(to_sql_interface_err)?
            .parse_statements()
            .map_err(to_sql_interface_err)?;

        polars_ensure!(ast.len() == 1, SQLInterface: "one (and only one) statement can be parsed at a time");
        let res = self.execute_statement(ast.first().unwrap())?;

        // Ensure the result uses the proper arenas.
        // This will instantiate new arenas with a new version.
        let lp_arena = std::mem::take(&mut self.lp_arena);
        let expr_arena = std::mem::take(&mut self.expr_arena);
        res.set_cached_arena(lp_arena, expr_arena);

        // Every execution should clear the statement-level maps.
        self.cte_map.clear();
        self.table_aliases.clear();
        self.joined_aliases.clear();
        self.named_windows.clear();

        Ok(res)
    }

    /// Add a function registry to the SQLContext.
    /// The registry provides the ability to add custom functions to the SQLContext.
    pub fn with_function_registry(mut self, function_registry: Arc<dyn FunctionRegistry>) -> Self {
        self.function_registry = function_registry;
        self
    }

    /// Get the function registry of the SQLContext
    pub fn registry(&self) -> &Arc<dyn FunctionRegistry> {
        &self.function_registry
    }

    /// Get a mutable reference to the function registry of the SQLContext
    pub fn registry_mut(&mut self) -> &mut dyn FunctionRegistry {
        Arc::get_mut(&mut self.function_registry).unwrap()
    }
}

impl SQLContext {
    pub(crate) fn execute_statement(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
        let ast = stmt;
        Ok(match ast {
            Statement::Query(query) => self.execute_query(query)?,
            stmt @ Statement::ShowTables { .. } => self.execute_show_tables(stmt)?,
            stmt @ Statement::CreateTable { .. } => self.execute_create_table(stmt)?,
            stmt @ Statement::Drop {
                object_type: ObjectType::Table,
                ..
            } => self.execute_drop_table(stmt)?,
            stmt @ Statement::Explain { .. } => self.execute_explain(stmt)?,
            stmt @ Statement::Truncate { .. } => self.execute_truncate_table(stmt)?,
            stmt @ Statement::Delete { .. } => self.execute_delete_from_table(stmt)?,
            _ => polars_bail!(
                SQLInterface: "statement type is not supported:\n{:?}", ast,
            ),
        })
    }

    pub(crate) fn execute_query(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
        self.register_ctes(query)?;
        self.execute_query_no_ctes(query)
    }

    pub(crate) fn execute_query_no_ctes(&mut self, query: &Query) -> PolarsResult<LazyFrame> {
        let lf = self.process_query(&query.body, query)?;
        self.process_limit_offset(lf, &query.limit, &query.offset)
    }

    pub(crate) fn get_frame_schema(&mut self, frame: &mut LazyFrame) -> PolarsResult<SchemaRef> {
        frame.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
    }

    pub(super) fn get_table_from_current_scope(&self, name: &str) -> Option<LazyFrame> {
        // Resolve the table name in the current scope; multi-stage fallback
        // * table name → cte name
        // * table alias → cte alias
        self.table_map
            .get(name)
            .or_else(|| self.cte_map.get(name))
            .or_else(|| {
                self.table_aliases.get(name).and_then(|alias| {
                    self.table_map
                        .get(alias.as_str())
                        .or_else(|| self.cte_map.get(alias.as_str()))
                })
            })
            .cloned()
    }

    /// Execute a query in an isolated context. This prevents subqueries from mutating
    /// arenas and other context state. Returns both the LazyFrame *and* its associated
    /// Schema (so that the correct arenas are used when determining schema).
    pub(crate) fn execute_isolated<F>(&mut self, query: F) -> PolarsResult<(LazyFrame, SchemaRef)>
    where
        F: FnOnce(&mut Self) -> PolarsResult<LazyFrame>,
    {
        // Save key state (arenas and lookups)
        let (joined_aliases, table_aliases, lp_arena, expr_arena, table_map) = (
            // "take" to ensure subqueries start with clean state
            std::mem::take(&mut self.joined_aliases),
            std::mem::take(&mut self.table_aliases),
            std::mem::take(&mut self.lp_arena),
            std::mem::take(&mut self.expr_arena),
            // "clone" to allow subqueries to see registered tables
            self.table_map.clone(),
        );

        // Execute query with clean state (eg: nested/subquery)
        let mut lf = query(self)?;
        let schema = self.get_frame_schema(&mut lf)?;

        // Restore saved state
        lf.set_cached_arena(
            std::mem::replace(&mut self.lp_arena, lp_arena),
            std::mem::replace(&mut self.expr_arena, expr_arena),
        );
        self.joined_aliases = joined_aliases;
        self.table_aliases = table_aliases;
        self.table_map = table_map;

        Ok((lf, schema))
    }

    fn expr_or_ordinal(
        &mut self,
        e: &SQLExpr,
        exprs: &[Expr],
        selected: Option<&[Expr]>,
        schema: Option<&Schema>,
        clause: &str,
    ) -> PolarsResult<Expr> {
        match e {
            SQLExpr::UnaryOp {
                op: UnaryOperator::Minus,
                expr,
            } if matches!(**expr, SQLExpr::Value(SQLValue::Number(_, _))) => {
                if let SQLExpr::Value(SQLValue::Number(ref idx, _)) = **expr {
                    Err(polars_err!(
                    SQLSyntax:
                    "negative ordinal values are invalid for {}; found -{}",
                    clause,
                    idx
                    ))
                } else {
                    unreachable!()
                }
            },
            SQLExpr::Value(SQLValue::Number(idx, _)) => {
                // note: sql queries are 1-indexed
                let idx = idx.parse::<usize>().map_err(|_| {
                    polars_err!(
                        SQLSyntax:
                        "negative ordinal values are invalid for {}; found {}",
                        clause,
                        idx
                    )
                })?;
                // note: "selected" cols represent final projection order, so we use those for
                // ordinal resolution. "exprs" may include cols that are subsequently dropped.
                let cols = if let Some(cols) = selected {
                    cols
                } else {
                    exprs
                };
                Ok(cols
                    .get(idx - 1)
                    .ok_or_else(|| {
                        polars_err!(
                            SQLInterface:
                            "{} ordinal value must refer to a valid column; found {}",
                            clause,
                            idx
                        )
                    })?
                    .clone())
            },
            SQLExpr::Value(v) => Err(polars_err!(
                SQLSyntax:
                "{} requires a valid expression or positive ordinal; found {}", clause, v,
            )),
            _ => parse_sql_expr(e, self, schema),
        }
    }

    pub(super) fn resolve_name(&self, tbl_name: &str, column_name: &str) -> String {
        if let Some(aliases) = self.joined_aliases.get(tbl_name) {
            if let Some(name) = aliases.get(column_name) {
                return name.to_string();
            }
        }
        column_name.to_string()
    }

    fn process_query(&mut self, expr: &SetExpr, query: &Query) -> PolarsResult<LazyFrame> {
        match expr {
            SetExpr::Select(select_stmt) => self.execute_select(select_stmt, query),
            SetExpr::Query(query) => self.execute_query_no_ctes(query),
            SetExpr::SetOperation {
                op: SetOperator::Union,
                set_quantifier,
                left,
                right,
            } => self.process_union(left, right, set_quantifier, query),

            #[cfg(feature = "semi_anti_join")]
            SetExpr::SetOperation {
                op: SetOperator::Intersect | SetOperator::Except,
                set_quantifier,
                left,
                right,
            } => self.process_except_intersect(left, right, set_quantifier, query),

            SetExpr::Values(Values {
                explicit_row: _,
                rows,
            }) => self.process_values(rows),

            SetExpr::Table(tbl) => {
                if tbl.table_name.is_some() {
                    let table_name = tbl.table_name.as_ref().unwrap();
                    self.get_table_from_current_scope(table_name)
                        .ok_or_else(|| {
                            polars_err!(
                                SQLInterface: "no table or alias named '{}' found",
                                tbl
                            )
                        })
                } else {
                    polars_bail!(SQLInterface: "'TABLE' requires valid table name")
                }
            },
            op => {
                polars_bail!(SQLInterface: "'{}' operation is currently unsupported", op)
            },
        }
    }

    #[cfg(feature = "semi_anti_join")]
    fn process_except_intersect(
        &mut self,
        left: &SetExpr,
        right: &SetExpr,
        quantifier: &SetQuantifier,
        query: &Query,
    ) -> PolarsResult<LazyFrame> {
        let (join_type, op_name) = match *query.body {
            SetExpr::SetOperation {
                op: SetOperator::Except,
                ..
            } => (JoinType::Anti, "EXCEPT"),
            _ => (JoinType::Semi, "INTERSECT"),
        };
        let mut lf = self.process_query(left, query)?;
        let mut rf = self.process_query(right, query)?;
        let lf_schema = self.get_frame_schema(&mut lf)?;

        let lf_cols: Vec<_> = lf_schema.iter_names_cloned().map(col).collect();
        let rf_cols = match quantifier {
            SetQuantifier::ByName => None,
            SetQuantifier::Distinct | SetQuantifier::None => {
                let rf_schema = self.get_frame_schema(&mut rf)?;
                let rf_cols: Vec<_> = rf_schema.iter_names_cloned().map(col).collect();
                if lf_cols.len() != rf_cols.len() {
                    polars_bail!(SQLInterface: "{} requires equal number of columns in each table (use '{} BY NAME' to combine mismatched tables)", op_name, op_name)
                }
                Some(rf_cols)
            },
            _ => {
                polars_bail!(SQLInterface: "'{} {}' is not supported", op_name, quantifier.to_string())
            },
        };
        let join = lf.join_builder().with(rf).how(join_type).join_nulls(true);
        let joined_tbl = match rf_cols {
            Some(rf_cols) => join.left_on(lf_cols).right_on(rf_cols).finish(),
            None => join.on(lf_cols).finish(),
        };
        Ok(joined_tbl.unique(None, UniqueKeepStrategy::Any))
    }

    fn process_union(
        &mut self,
        left: &SetExpr,
        right: &SetExpr,
        quantifier: &SetQuantifier,
        query: &Query,
    ) -> PolarsResult<LazyFrame> {
        let mut lf = self.process_query(left, query)?;
        let mut rf = self.process_query(right, query)?;
        let opts = UnionArgs {
            parallel: true,
            to_supertypes: true,
            ..Default::default()
        };
        match quantifier {
            // UNION [ALL | DISTINCT]
            SetQuantifier::All | SetQuantifier::Distinct | SetQuantifier::None => {
                let lf_schema = self.get_frame_schema(&mut lf)?;
                let rf_schema = self.get_frame_schema(&mut rf)?;
                if lf_schema.len() != rf_schema.len() {
                    polars_bail!(SQLInterface: "UNION requires equal number of columns in each table (use 'UNION BY NAME' to combine mismatched tables)")
                }
                // rename `rf` columns to match `lf` if they differ; SQL behaves
                // positionally on UNION ops (unless using the "BY NAME" qualifier)
                if lf_schema.iter_names().ne(rf_schema.iter_names()) {
                    rf = rf.rename(rf_schema.iter_names(), lf_schema.iter_names(), true);
                }
                let concatenated = concat(vec![lf, rf], opts);
                match quantifier {
                    SetQuantifier::Distinct | SetQuantifier::None => {
                        concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
                    },
                    _ => concatenated,
                }
            },
            // UNION ALL BY NAME
            #[cfg(feature = "diagonal_concat")]
            SetQuantifier::AllByName => concat_lf_diagonal(vec![lf, rf], opts),
            // UNION [DISTINCT] BY NAME
            #[cfg(feature = "diagonal_concat")]
            SetQuantifier::ByName | SetQuantifier::DistinctByName => {
                let concatenated = concat_lf_diagonal(vec![lf, rf], opts);
                concatenated.map(|lf| lf.unique(None, UniqueKeepStrategy::Any))
            },
            #[allow(unreachable_patterns)]
            _ => polars_bail!(SQLInterface: "'UNION {}' is not currently supported", quantifier),
        }
    }

    fn process_values(&mut self, values: &[Vec<SQLExpr>]) -> PolarsResult<LazyFrame> {
        let frame_rows: Vec<Row> = values.iter().map(|row| {
            let row_data: Result<Vec<_>, _> = row.iter().map(|expr| {
                let expr = parse_sql_expr(expr, self, None)?;
                match expr {
                    Expr::Literal(value) => {
                        value.to_any_value()
                            .ok_or_else(|| polars_err!(SQLInterface: "invalid literal value: {:?}", value))
                            .map(|av| av.into_static())
                    },
                    _ => polars_bail!(SQLInterface: "VALUES clause expects literals; found {}", expr),
                }
            }).collect();
            row_data.map(Row::new)
        }).collect::<Result<_, _>>()?;

        Ok(DataFrame::from_rows(frame_rows.as_ref())?.lazy())
    }

    // EXPLAIN SELECT * FROM DF
    fn execute_explain(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
        match stmt {
            Statement::Explain { statement, .. } => {
                let lf = self.execute_statement(statement)?;
                let plan = lf.describe_optimized_plan()?;
                let plan = plan
                    .split('\n')
                    .collect::<Series>()
                    .with_name(PlSmallStr::from_static("Logical Plan"))
                    .into_column();
                let df = DataFrame::new(vec![plan])?;
                Ok(df.lazy())
            },
            _ => polars_bail!(SQLInterface: "unexpected statement type; expected EXPLAIN"),
        }
    }

    // SHOW TABLES
    fn execute_show_tables(&mut self, _: &Statement) -> PolarsResult<LazyFrame> {
        let tables = Column::new("name".into(), self.get_tables());
        let df = DataFrame::new(vec![tables])?;
        Ok(df.lazy())
    }

    // DROP TABLE <tbl>
    fn execute_drop_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
        match stmt {
            Statement::Drop { names, .. } => {
                names.iter().for_each(|name| {
                    self.table_map.remove(&name.to_string());
                });
                Ok(DataFrame::empty().lazy())
            },
            _ => polars_bail!(SQLInterface: "unexpected statement type; expected DROP"),
        }
    }

    // DELETE FROM <tbl> [WHERE ...]
    fn execute_delete_from_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
        if let Statement::Delete(Delete {
            tables,
            from,
            using,
            selection,
            returning,
            order_by,
            limit,
        }) = stmt
        {
            if !tables.is_empty()
                || using.is_some()
                || returning.is_some()
                || limit.is_some()
                || !order_by.is_empty()
            {
                let error_message = match () {
                    _ if !tables.is_empty() => "DELETE expects exactly one table name",
                    _ if using.is_some() => "DELETE does not support the USING clause",
                    _ if returning.is_some() => "DELETE does not support the RETURNING clause",
                    _ if limit.is_some() => "DELETE does not support the LIMIT clause",
                    _ if !order_by.is_empty() => "DELETE does not support the ORDER BY clause",
                    _ => unreachable!(),
                };
                polars_bail!(SQLInterface: error_message);
            }
            let from_tables = match &from {
                FromTable::WithFromKeyword(from) => from,
                FromTable::WithoutKeyword(from) => from,
            };
            if from_tables.len() > 1 {
                polars_bail!(SQLInterface: "cannot have multiple tables in DELETE FROM (found {})", from_tables.len())
            }
            let tbl_expr = from_tables.first().unwrap();
            if !tbl_expr.joins.is_empty() {
                polars_bail!(SQLInterface: "DELETE does not support table JOINs")
            }
            let (_, mut lf) = self.get_table(&tbl_expr.relation)?;
            if selection.is_none() {
                // no WHERE clause; equivalent to TRUNCATE (drop all rows)
                Ok(DataFrame::empty_with_schema(
                    lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
                        .unwrap()
                        .as_ref(),
                )
                .lazy())
            } else {
                // apply constraint as inverted filter (drops rows matching the selection)
                Ok(self.process_where(lf.clone(), selection, true, None)?)
            }
        } else {
            polars_bail!(SQLInterface: "unexpected statement type; expected DELETE")
        }
    }

    // TRUNCATE <tbl>
    fn execute_truncate_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
        if let Statement::Truncate {
            table_names,
            partitions,
            ..
        } = stmt
        {
            match partitions {
                None => {
                    if table_names.len() != 1 {
                        polars_bail!(SQLInterface: "TRUNCATE expects exactly one table name; found {}", table_names.len())
                    }
                    let tbl = table_names[0].to_string();
                    if let Some(lf) = self.table_map.get_mut(&tbl) {
                        *lf = DataFrame::empty_with_schema(
                            lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
                                .unwrap()
                                .as_ref(),
                        )
                        .lazy();
                        Ok(lf.clone())
                    } else {
                        polars_bail!(SQLInterface: "table '{}' does not exist", tbl);
                    }
                },
                _ => {
                    polars_bail!(SQLInterface: "TRUNCATE does not support use of 'partitions'")
                },
            }
        } else {
            polars_bail!(SQLInterface: "unexpected statement type; expected TRUNCATE")
        }
    }

    fn register_cte(&mut self, name: &str, lf: LazyFrame) {
        self.cte_map.insert(name.to_owned(), lf);
    }

    fn register_ctes(&mut self, query: &Query) -> PolarsResult<()> {
        if let Some(with) = &query.with {
            if with.recursive {
                polars_bail!(SQLInterface: "recursive CTEs are not supported")
            }
            for cte in &with.cte_tables {
                let cte_name = cte.alias.name.value.clone();
                let mut lf = self.execute_query(&cte.query)?;
                lf = self.rename_columns_from_table_alias(lf, &cte.alias)?;
                self.register_cte(&cte_name, lf);
            }
        }
        Ok(())
    }

    fn register_named_windows(
        &mut self,
        named_windows: &[NamedWindowDefinition],
    ) -> PolarsResult<()> {
        for NamedWindowDefinition(name, expr) in named_windows {
            let spec = match expr {
                NamedWindowExpr::NamedWindow(ref_name) => self
                    .named_windows
                    .get(&ref_name.value)
                    .ok_or_else(|| {
                        polars_err!(
                            SQLInterface:
                            "named window '{}' references undefined window '{}'",
                            name.value, ref_name.value
                        )
                    })?
                    .clone(),
                NamedWindowExpr::WindowSpec(spec) => spec.clone(),
            };
            self.named_windows.insert(name.value.clone(), spec);
        }
        Ok(())
    }

    /// execute the 'FROM' part of the query
    fn execute_from_statement(&mut self, tbl_expr: &TableWithJoins) -> PolarsResult<LazyFrame> {
        let (l_name, mut lf) = self.get_table(&tbl_expr.relation)?;
        if !tbl_expr.joins.is_empty() {
            for join in &tbl_expr.joins {
                let (r_name, mut rf) = self.get_table(&join.relation)?;
                if r_name.is_empty() {
                    // Require non-empty to avoid duplicate column errors from nested self-joins.
                    polars_bail!(
                        SQLInterface:
                        "cannot join on unnamed relation; please provide an alias"
                    )
                }
                let left_schema = self.get_frame_schema(&mut lf)?;
                let right_schema = self.get_frame_schema(&mut rf)?;

                lf = match &join.join_operator {
                    op @ (JoinOperator::FullOuter(constraint)
                    | JoinOperator::LeftOuter(constraint)
                    | JoinOperator::RightOuter(constraint)
                    | JoinOperator::Inner(constraint)
                    | JoinOperator::Anti(constraint)
                    | JoinOperator::Semi(constraint)
                    | JoinOperator::LeftAnti(constraint)
                    | JoinOperator::LeftSemi(constraint)
                    | JoinOperator::RightAnti(constraint)
                    | JoinOperator::RightSemi(constraint)) => {
                        let (lf, rf) = match op {
                            JoinOperator::RightAnti(_) | JoinOperator::RightSemi(_) => (rf, lf),
                            _ => (lf, rf),
                        };
                        self.process_join(
                            &TableInfo {
                                frame: lf,
                                name: (&l_name).into(),
                                schema: left_schema.clone(),
                            },
                            &TableInfo {
                                frame: rf,
                                name: (&r_name).into(),
                                schema: right_schema.clone(),
                            },
                            constraint,
                            match op {
                                JoinOperator::FullOuter(_) => JoinType::Full,
                                JoinOperator::LeftOuter(_) => JoinType::Left,
                                JoinOperator::RightOuter(_) => JoinType::Right,
                                JoinOperator::Inner(_) => JoinType::Inner,
                                #[cfg(feature = "semi_anti_join")]
                                JoinOperator::Anti(_)
                                | JoinOperator::LeftAnti(_)
                                | JoinOperator::RightAnti(_) => JoinType::Anti,
                                #[cfg(feature = "semi_anti_join")]
                                JoinOperator::Semi(_)
                                | JoinOperator::LeftSemi(_)
                                | JoinOperator::RightSemi(_) => JoinType::Semi,
                                join_type => polars_bail!(
                                    SQLInterface:
                                    "join type '{:?}' not currently supported",
                                    join_type
                                ),
                            },
                        )?
                    },
                    JoinOperator::CrossJoin => {
                        lf.cross_join(rf, Some(format_pl_smallstr!(":{}", r_name)))
                    },
                    join_type => {
                        polars_bail!(SQLInterface: "join type '{:?}' not currently supported", join_type)
                    },
                };

                // track join-aliased columns so we can resolve them later
                let joined_schema = self.get_frame_schema(&mut lf)?;

                self.joined_aliases.insert(
                    r_name.clone(),
                    right_schema
                        .iter_names()
                        .filter_map(|name| {
                            // col exists in both tables and is aliased in the joined result
                            let aliased_name = format!("{name}:{r_name}");
                            if left_schema.contains(name)
                                && joined_schema.contains(aliased_name.as_str())
                            {
                                Some((name.to_string(), aliased_name))
                            } else {
                                None
                            }
                        })
                        .collect::<PlHashMap<String, String>>(),
                );
            }
        };
        Ok(lf)
    }

    /// Check that the SELECT statement only contains supported clauses.
    fn validate_select(&self, select_stmt: &Select) -> PolarsResult<()> {
        // Destructure "Select" exhaustively; that way if/when new fields are added in
        // future sqlparser versions, we'll get a compilation error and can handle them
        let Select {
            // Supported clauses
            distinct: _,
            from: _,
            group_by: _,
            having: _,
            named_window: _,
            projection: _,
            selection: _,

            // Metadata/token fields (can ignore)
            select_token: _,
            top_before_distinct: _,
            window_before_qualify: _,

            // Unsupported clauses
            ref cluster_by,
            ref connect_by,
            ref distribute_by,
            ref into,
            ref lateral_views,
            ref prewhere,
            ref qualify,
            ref sort_by,
            ref top,
            ref value_table_mode,
        } = *select_stmt;

        // Raise specific error messages for unsupported attributes
        polars_ensure!(cluster_by.is_empty(), SQLInterface: "`CLUSTER BY` clause is not supported");
        polars_ensure!(connect_by.is_none(), SQLInterface: "`CONNECT BY` clause is not supported");
        polars_ensure!(distribute_by.is_empty(), SQLInterface: "`DISTRIBUTE BY` clause is not supported");
        polars_ensure!(into.is_none(), SQLInterface: "`SELECT INTO`clause  is not supported");
        polars_ensure!(lateral_views.is_empty(), SQLInterface: "`LATERAL VIEW` clause is not supported");
        polars_ensure!(prewhere.is_none(), SQLInterface: "`PREWHERE` clause is not supported");
        polars_ensure!(qualify.is_none(), SQLInterface: "`QUALIFY` clause is not supported");
        polars_ensure!(sort_by.is_empty(), SQLInterface: "SORT BY` clause is not supported; use `ORDER BY` instead");
        polars_ensure!(top.is_none(), SQLInterface: "`TOP` clause is not supported; use `LIMIT` instead");
        polars_ensure!(value_table_mode.is_none(), SQLInterface: "`SELECT AS VALUE/STRUCT` is not supported");

        Ok(())
    }

    /// Execute the 'SELECT' part of the query.
    fn execute_select(&mut self, select_stmt: &Select, query: &Query) -> PolarsResult<LazyFrame> {
        // Check that the statement doesn't contain unsupported SELECT clauses
        self.validate_select(select_stmt)?;

        // Parse named windows first, as they may be referenced in the SELECT clause
        self.register_named_windows(&select_stmt.named_window)?;

        let mut lf = if select_stmt.from.is_empty() {
            DataFrame::empty().lazy()
        } else {
            // Note: implicit joins need more work to support properly,
            // explicit joins are preferred for now (ref: #16662)
            let from = select_stmt.clone().from;
            if from.len() > 1 {
                polars_bail!(SQLInterface: "multiple tables in FROM clause are not currently supported (found {}); use explicit JOIN syntax instead", from.len())
            }
            self.execute_from_statement(from.first().unwrap())?
        };

        // Filter expression (WHERE clause)
        let schema = self.get_frame_schema(&mut lf)?;
        lf = self.process_where(lf, &select_stmt.selection, false, Some(schema.clone()))?;

        // 'SELECT *' modifiers
        let mut select_modifiers = SelectModifiers {
            ilike: None,
            exclude: PlHashSet::new(),
            rename: PlHashMap::new(),
            replace: vec![],
        };

        let mut projections =
            self.column_projections(select_stmt, &schema, &mut select_modifiers)?;

        // Apply UNNEST (explode) at the frame level to ensure that
        // we maintain row-coherence of the exploded result(s)
        let mut explode_names = Vec::with_capacity(projections.len());
        for expr in &projections {
            for e in expr {
                if let Expr::Explode { input, .. } = e {
                    if let Expr::Column(name) = input.as_ref() {
                        explode_names.push(name.clone());
                    }
                }
            }
        }
        if !explode_names.is_empty() {
            explode_names.dedup();
            lf = lf.explode(
                Selector::ByName {
                    names: Arc::from(explode_names),
                    strict: true,
                },
                ExplodeOptions {
                    empty_as_null: true,
                    keep_nulls: true,
                },
            );
            projections = projections
                .into_iter()
                .map(|p| {
                    p.map_expr(|e| match e {
                        Expr::Explode { input, .. } => input.as_ref().clone(),
                        _ => e,
                    })
                })
                .collect();
        }

        // Check for "GROUP BY ..." (after determining projections)
        let mut group_by_keys: Vec<Expr> = Vec::new();
        match &select_stmt.group_by {
            // Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
            GroupByExpr::Expressions(group_by_exprs, modifiers) => {
                if !modifiers.is_empty() {
                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
                }
                // translate the group expressions, allowing ordinal values
                group_by_keys = group_by_exprs
                    .iter()
                    .map(|e| {
                        self.expr_or_ordinal(
                            e,
                            &projections,
                            None,
                            Some(schema.deref()),
                            "GROUP BY",
                        )
                    })
                    .collect::<PolarsResult<_>>()?
            },
            // "GROUP BY ALL" syntax; automatically adds expressions that do not contain
            // nested agg/window funcs to the group key (also ignores literals).
            GroupByExpr::All(modifiers) => {
                if !modifiers.is_empty() {
                    polars_bail!(SQLInterface: "GROUP BY does not support CUBE, ROLLUP, or TOTALS modifiers")
                }
                projections.iter().for_each(|expr| match expr {
                    // immediately match the most common cases (col|agg|len|lit, optionally aliased).
                    Expr::Agg(_) | Expr::Len | Expr::Literal(_) => (),
                    Expr::Column(_) => group_by_keys.push(expr.clone()),
                    Expr::Alias(e, _)
                        if matches!(&**e, Expr::Agg(_) | Expr::Len | Expr::Literal(_)) => {},
                    Expr::Alias(e, _) if matches!(&**e, Expr::Column(_)) => {
                        if let Expr::Column(name) = &**e {
                            group_by_keys.push(col(name.clone()));
                        }
                    },
                    _ => {
                        // If not quick-matched, add if no nested agg/window expressions
                        if !has_expr(expr, |e| {
                            matches!(e, Expr::Agg(_))
                                || matches!(e, Expr::Len)
                                || matches!(e, Expr::Over { .. })
                                || {
                                    #[cfg(feature = "dynamic_group_by")]
                                    {
                                        matches!(e, Expr::Rolling { .. })
                                    }
                                    #[cfg(not(feature = "dynamic_group_by"))]
                                    {
                                        false
                                    }
                                }
                        }) {
                            group_by_keys.push(expr.clone())
                        }
                    },
                });
            },
        };

        lf = if group_by_keys.is_empty() {
            // The 'having' clause is only valid inside 'group by'
            if select_stmt.having.is_some() {
                polars_bail!(SQLSyntax: "HAVING clause not valid outside of GROUP BY; found:\n{:?}", select_stmt.having);
            };

            // Final/selected cols, accounting for 'SELECT *' modifiers
            let mut retained_cols = Vec::with_capacity(projections.len());
            let mut retained_names = Vec::with_capacity(projections.len());
            let have_order_by = query.order_by.is_some();
            // Initialize containing InheritsContext to handle empty projection case.
            let mut projection_heights = ExprSqlProjectionHeightBehavior::InheritsContext;

            // Note: if there is an 'order by' then we project everything (original cols
            // and new projections) and *then* select the final cols; the retained cols
            // are used to ensure a correct final projection. If there's no 'order by',
            // clause then we can project the final column *expressions* directly.
            for p in projections.iter() {
                let name = p.to_field(schema.deref())?.name.to_string();
                if select_modifiers.matches_ilike(&name)
                    && !select_modifiers.exclude.contains(&name)
                {
                    projection_heights |= ExprSqlProjectionHeightBehavior::identify_from_expr(p);

                    retained_cols.push(if have_order_by {
                        col(name.as_str())
                    } else {
                        p.clone()
                    });
                    retained_names.push(col(name));
                }
            }

            // Apply the remaining modifiers and establish the final projection
            if have_order_by {
                // We can safely use `with_columns()` and avoid a join if:
                // * There is already a projection that projects to the table height.
                // * All projection heights inherit from context (e.g. all scalar literals that
                //   are to be broadcasted to table height).
                if projection_heights.contains(ExprSqlProjectionHeightBehavior::MaintainsColumn)
                    || projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
                {
                    lf = lf.with_columns(projections);
                } else {
                    // We hit this branch if the output height is not guaranteed to match the table
                    // height. E.g.:
                    //
                    // * SELECT COUNT(*) FROM df ORDER BY sort_key;
                    //
                    // For these cases we truncate / extend the sorting columns with NULLs to match
                    // the output height. We do this by projecting independently and then joining
                    // back the original frame on the row index.
                    const NAME: PlSmallStr = PlSmallStr::from_static("__PL_INDEX");
                    lf = lf
                        .clone()
                        .select(projections)
                        .with_row_index(NAME, None)
                        .join(
                            lf.with_row_index(NAME, None),
                            [col(NAME)],
                            [col(NAME)],
                            JoinArgs {
                                how: JoinType::Left,
                                validation: Default::default(),
                                suffix: None,
                                slice: None,
                                nulls_equal: false,
                                coalesce: Default::default(),
                                maintain_order: polars_ops::frame::MaintainOrderJoin::Left,
                            },
                        );
                }
            }

            if !select_modifiers.replace.is_empty() {
                lf = lf.with_columns(&select_modifiers.replace);
            }
            if !select_modifiers.rename.is_empty() {
                lf = lf.with_columns(select_modifiers.renamed_cols());
            }

            lf = self.process_order_by(lf, &query.order_by, Some(&retained_cols))?;

            // Note: If `have_order_by`, with_columns is already done above.
            if projection_heights == ExprSqlProjectionHeightBehavior::InheritsContext
                && !have_order_by
            {
                // All projections need to be broadcasted to table height, so evaluate in `with_columns()`
                lf = lf.with_columns(retained_cols).select(retained_names);
            } else {
                lf = lf.select(retained_cols);
            }

            if !select_modifiers.rename.is_empty() {
                lf = lf.rename(
                    select_modifiers.rename.keys(),
                    select_modifiers.rename.values(),
                    true,
                );
            };
            lf
        } else {
            lf = self.process_group_by(lf, &group_by_keys, &projections)?;
            lf = self.process_order_by(lf, &query.order_by, None)?;

            // Apply optional 'having' clause, post-aggregation.
            let schema = Some(self.get_frame_schema(&mut lf)?);
            match select_stmt.having.as_ref() {
                Some(expr) => lf.filter(parse_sql_expr(expr, self, schema.as_deref())?),
                None => lf,
            }
        };

        // Apply optional DISTINCT clause.
        lf = match &select_stmt.distinct {
            Some(Distinct::Distinct) => lf.unique_stable(None, UniqueKeepStrategy::Any),
            Some(Distinct::On(exprs)) => {
                // TODO: support exprs in `unique` see https://github.com/pola-rs/polars/issues/5760
                let schema = Some(self.get_frame_schema(&mut lf)?);
                let cols = exprs
                    .iter()
                    .map(|e| {
                        let expr = parse_sql_expr(e, self, schema.as_deref())?;
                        if let Expr::Column(name) = expr {
                            Ok(name)
                        } else {
                            Err(polars_err!(SQLSyntax:"DISTINCT ON only supports column names"))
                        }
                    })
                    .collect::<PolarsResult<Vec<_>>>()?;

                // DISTINCT ON has to apply the ORDER BY before the operation.
                lf = self.process_order_by(lf, &query.order_by, None)?;
                return Ok(lf.unique_stable(
                    Some(Selector::ByName {
                        names: cols.into(),
                        strict: true,
                    }),
                    UniqueKeepStrategy::First,
                ));
            },
            None => lf,
        };
        Ok(lf)
    }

    fn column_projections(
        &mut self,
        select_stmt: &Select,
        schema: &SchemaRef,
        select_modifiers: &mut SelectModifiers,
    ) -> PolarsResult<Vec<Expr>> {
        let parsed_items: PolarsResult<Vec<Vec<Expr>>> = select_stmt
            .projection
            .iter()
            .map(|select_item| match select_item {
                SelectItem::UnnamedExpr(expr) => {
                    Ok(vec![parse_sql_expr(expr, self, Some(schema))?])
                },
                SelectItem::ExprWithAlias { expr, alias } => {
                    let expr = parse_sql_expr(expr, self, Some(schema))?;
                    Ok(vec![expr.alias(PlSmallStr::from_str(alias.value.as_str()))])
                },
                SelectItem::QualifiedWildcard(obj_name, wildcard_options) => self
                    .process_qualified_wildcard(
                        obj_name,
                        wildcard_options,
                        select_modifiers,
                        Some(schema),
                    ),
                SelectItem::Wildcard(wildcard_options) => {
                    let cols = schema
                        .iter_names()
                        .map(|name| col(name.clone()))
                        .collect::<Vec<_>>();

                    self.process_wildcard_additional_options(
                        cols,
                        wildcard_options,
                        select_modifiers,
                        Some(schema),
                    )
                },
            })
            .collect();

        let flattened_exprs: Vec<Expr> = parsed_items?
            .into_iter()
            .flatten()
            .flat_map(|expr| expand_exprs(expr, schema))
            .collect();

        Ok(flattened_exprs)
    }

    fn process_where(
        &mut self,
        mut lf: LazyFrame,
        expr: &Option<SQLExpr>,
        invert_filter: bool,
        schema: Option<SchemaRef>,
    ) -> PolarsResult<LazyFrame> {
        if let Some(expr) = expr {
            let schema = match schema {
                None => self.get_frame_schema(&mut lf)?,
                Some(s) => s,
            };

            // shortcut filter evaluation if given expression is just TRUE or FALSE
            let (all_true, all_false) = match expr {
                SQLExpr::Value(SQLValue::Boolean(b)) => (*b, !*b),
                SQLExpr::BinaryOp { left, op, right } => match (&**left, &**right, op) {
                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::Eq) => (a == b, a != b),
                    (SQLExpr::Value(a), SQLExpr::Value(b), BinaryOperator::NotEq) => {
                        (a != b, a == b)
                    },
                    _ => (false, false),
                },
                _ => (false, false),
            };
            if (all_true && !invert_filter) || (all_false && invert_filter) {
                return Ok(lf);
            } else if (all_false && !invert_filter) || (all_true && invert_filter) {
                return Ok(DataFrame::empty_with_schema(schema.as_ref()).lazy());
            }

            // ...otherwise parse and apply the filter as normal
            let mut filter_expression = parse_sql_expr(expr, self, Some(schema).as_deref())?;
            if filter_expression.clone().meta().has_multiple_outputs() {
                filter_expression = all_horizontal([filter_expression])?;
            }
            lf = self.process_subqueries(lf, vec![&mut filter_expression]);
            lf = if invert_filter {
                lf.remove(filter_expression)
            } else {
                lf.filter(filter_expression)
            };
        }
        Ok(lf)
    }

    pub(super) fn process_join(
        &mut self,
        tbl_left: &TableInfo,
        tbl_right: &TableInfo,
        constraint: &JoinConstraint,
        join_type: JoinType,
    ) -> PolarsResult<LazyFrame> {
        let (left_on, right_on) = process_join_constraint(constraint, tbl_left, tbl_right, self)?;
        let coalesce_type = match constraint {
            // "NATURAL" joins should coalesce; otherwise we disambiguate
            JoinConstraint::Natural => JoinCoalesce::CoalesceColumns,
            _ => JoinCoalesce::KeepColumns,
        };
        let joined = tbl_left
            .frame
            .clone()
            .join_builder()
            .with(tbl_right.frame.clone())
            .left_on(left_on)
            .right_on(right_on)
            .how(join_type)
            .suffix(format!(":{}", tbl_right.name))
            .coalesce(coalesce_type)
            .finish();

        Ok(joined)
    }

    fn process_subqueries(&self, lf: LazyFrame, exprs: Vec<&mut Expr>) -> LazyFrame {
        let mut contexts = vec![];
        for expr in exprs {
            *expr = expr.clone().map_expr(|e| match e {
                Expr::SubPlan(lp, names) => {
                    contexts.push(<LazyFrame>::from((**lp).clone()));
                    if names.len() == 1 {
                        Expr::Column(names[0].as_str().into())
                    } else {
                        Expr::SubPlan(lp, names)
                    }
                },
                e => e,
            })
        }
        if contexts.is_empty() {
            lf
        } else {
            lf.with_context(contexts)
        }
    }

    fn execute_create_table(&mut self, stmt: &Statement) -> PolarsResult<LazyFrame> {
        if let Statement::CreateTable(CreateTable {
            if_not_exists,
            name,
            query,
            columns,
            like,
            ..
        }) = stmt
        {
            let tbl_name = name.0.first().unwrap().value.as_str();
            if *if_not_exists && self.table_map.contains_key(tbl_name) {
                polars_bail!(SQLInterface: "relation '{}' already exists", tbl_name);
            }
            let lf = match (query, columns.is_empty(), like) {
                (Some(query), true, None) => {
                    // ----------------------------------------------------
                    // CREATE TABLE [IF NOT EXISTS] <name> AS <query>
                    // ----------------------------------------------------
                    self.execute_query(query)?
                },
                (None, false, None) => {
                    // ----------------------------------------------------
                    // CREATE TABLE [IF NOT EXISTS] <name> (<coldef>, ...)
                    // ----------------------------------------------------
                    let mut schema = Schema::with_capacity(columns.len());
                    for col in columns {
                        let col_name = col.name.value.as_str();
                        let dtype = map_sql_dtype_to_polars(&col.data_type)?;
                        schema.insert_at_index(schema.len(), col_name.into(), dtype)?;
                    }
                    DataFrame::empty_with_schema(&schema).lazy()
                },
                (None, true, Some(from_name)) => {
                    // ----------------------------------------------------
                    // CREATE TABLE [IF NOT EXISTS] <name> LIKE <table>
                    // ----------------------------------------------------
                    let like_table = from_name.0.first().unwrap().value.as_str();
                    if let Some(mut table) = self.table_map.get(like_table).cloned() {
                        let schema = self.get_frame_schema(&mut table)?;
                        DataFrame::empty_with_schema(&schema).lazy()
                    } else {
                        polars_bail!(SQLInterface: "table given in LIKE does not exist: {}", like_table)
                    }
                },
                // No valid options provided
                (None, true, None) => {
                    polars_bail!(SQLInterface: "CREATE TABLE expected a query, column definitions, or LIKE clause")
                },
                // Mutually exclusive options
                _ => {
                    polars_bail!(
                        SQLInterface: "CREATE TABLE received mutually exclusive options:\nquery = {:?}\ncolumns = {:?}\nlike = {:?}",
                        query,
                        columns,
                        like,
                    )
                },
            };
            self.register(tbl_name, lf);

            let df_created =
                df! { "Response" => [format!("CREATE TABLE {}", name.0.first().unwrap().value)] };
            Ok(df_created.unwrap().lazy())
        } else {
            unreachable!()
        }
    }

    fn get_table(&mut self, relation: &TableFactor) -> PolarsResult<(String, LazyFrame)> {
        match relation {
            TableFactor::Table {
                name, alias, args, ..
            } => {
                if let Some(args) = args {
                    return self.execute_table_function(name, alias, &args.args);
                }
                let tbl_name = name.0.first().unwrap().value.as_str();
                if let Some(lf) = self.get_table_from_current_scope(tbl_name) {
                    match alias {
                        Some(alias) => {
                            self.table_aliases
                                .insert(alias.name.value.clone(), tbl_name.to_string());
                            Ok((alias.to_string(), lf))
                        },
                        None => Ok((tbl_name.to_string(), lf)),
                    }
                } else {
                    polars_bail!(SQLInterface: "relation '{}' was not found", tbl_name);
                }
            },
            TableFactor::Derived {
                lateral,
                subquery,
                alias,
            } => {
                polars_ensure!(!(*lateral), SQLInterface: "LATERAL not supported");
                if let Some(alias) = alias {
                    let mut lf = self.execute_query_no_ctes(subquery)?;
                    lf = self.rename_columns_from_table_alias(lf, alias)?;
                    self.table_map.insert(alias.name.value.clone(), lf.clone());
                    Ok((alias.name.value.clone(), lf))
                } else {
                    polars_bail!(SQLSyntax: "derived tables must have aliases");
                }
            },
            TableFactor::UNNEST {
                alias,
                array_exprs,
                with_offset,
                with_offset_alias: _,
                ..
            } => {
                if let Some(alias) = alias {
                    let column_names: Vec<Option<PlSmallStr>> = alias
                        .columns
                        .iter()
                        .map(|c| {
                            if c.name.value.is_empty() {
                                None
                            } else {
                                Some(PlSmallStr::from_str(c.name.value.as_str()))
                            }
                        })
                        .collect();

                    let column_values: Vec<Series> = array_exprs
                        .iter()
                        .map(|arr| parse_sql_array(arr, self))
                        .collect::<Result<_, _>>()?;

                    polars_ensure!(!column_names.is_empty(),
                        SQLSyntax:
                        "UNNEST table alias must also declare column names, eg: {} (a,b,c)", alias.name.to_string()
                    );
                    if column_names.len() != column_values.len() {
                        let plural = if column_values.len() > 1 { "s" } else { "" };
                        polars_bail!(
                            SQLSyntax:
                            "UNNEST table alias requires {} column name{}, found {}", column_values.len(), plural, column_names.len()
                        );
                    }
                    let column_series: Vec<Column> = column_values
                        .into_iter()
                        .zip(column_names)
                        .map(|(s, name)| {
                            if let Some(name) = name {
                                s.with_name(name)
                            } else {
                                s
                            }
                        })
                        .map(Column::from)
                        .collect();

                    let lf = DataFrame::new(column_series)?.lazy();

                    if *with_offset {
                        // TODO: support 'WITH ORDINALITY|OFFSET' modifier.
                        polars_bail!(SQLInterface: "UNNEST tables do not (yet) support WITH ORDINALITY|OFFSET");
                    }
                    let table_name = alias.name.value.clone();
                    self.table_map.insert(table_name.clone(), lf.clone());
                    Ok((table_name, lf))
                } else {
                    polars_bail!(SQLSyntax: "UNNEST table must have an alias");
                }
            },
            TableFactor::NestedJoin {
                table_with_joins,
                alias,
            } => {
                let lf = self.execute_from_statement(table_with_joins)?;
                match alias {
                    Some(a) => Ok((a.name.value.clone(), lf)),
                    None => Ok(("".to_string(), lf)),
                }
            },
            // Support bare table, optionally with an alias, for now
            _ => polars_bail!(SQLInterface: "not yet implemented: {}", relation),
        }
    }

    fn execute_table_function(
        &mut self,
        name: &ObjectName,
        alias: &Option<TableAlias>,
        args: &[FunctionArg],
    ) -> PolarsResult<(String, LazyFrame)> {
        let tbl_fn = name.0.first().unwrap().value.as_str();
        let read_fn = tbl_fn.parse::<PolarsTableFunctions>()?;
        let (tbl_name, lf) = read_fn.execute(args)?;
        #[allow(clippy::useless_asref)]
        let tbl_name = alias
            .as_ref()
            .map(|a| a.name.value.clone())
            .unwrap_or_else(|| tbl_name.to_str().to_string());

        self.table_map.insert(tbl_name.clone(), lf.clone());
        Ok((tbl_name, lf))
    }

    fn process_order_by(
        &mut self,
        mut lf: LazyFrame,
        order_by: &Option<OrderBy>,
        selected: Option<&[Expr]>,
    ) -> PolarsResult<LazyFrame> {
        if order_by.as_ref().is_none_or(|ob| ob.exprs.is_empty()) {
            return Ok(lf);
        }
        let schema = self.get_frame_schema(&mut lf)?;
        let columns_iter = schema.iter_names().map(|e| col(e.clone()));

        let order_by = order_by.as_ref().unwrap().exprs.clone();
        let mut descending = Vec::with_capacity(order_by.len());
        let mut nulls_last = Vec::with_capacity(order_by.len());
        let mut by: Vec<Expr> = Vec::with_capacity(order_by.len());

        if order_by.len() == 1  // support `ORDER BY ALL` (iff there is no column named 'ALL' in the schema)
            && matches!(&order_by[0].expr, SQLExpr::Identifier(ident) if ident.value.to_uppercase() == "ALL" && !schema.iter_names().any(|name| name.to_uppercase() == "ALL"))
        {
            if let Some(selected) = selected {
                by.extend(selected.iter().cloned());
            } else {
                by.extend(columns_iter);
            };
            let desc_order = !order_by[0].asc.unwrap_or(true);
            nulls_last.resize(by.len(), !order_by[0].nulls_first.unwrap_or(desc_order));
            descending.resize(by.len(), desc_order);
        } else {
            let columns = &columns_iter.collect::<Vec<_>>();
            for ob in order_by {
                // note: if not specified 'NULLS FIRST' is default for DESC, 'NULLS LAST' otherwise
                // https://www.postgresql.org/docs/current/queries-order.html
                let desc_order = !ob.asc.unwrap_or(true);
                nulls_last.push(!ob.nulls_first.unwrap_or(desc_order));
                descending.push(desc_order);

                // translate order expression, allowing ordinal values
                by.push(self.expr_or_ordinal(
                    &ob.expr,
                    columns,
                    selected,
                    Some(&schema),
                    "ORDER BY",
                )?)
            }
        }
        Ok(lf.sort_by_exprs(
            &by,
            SortMultipleOptions::default()
                .with_order_descending_multi(descending)
                .with_nulls_last_multi(nulls_last)
                .with_maintain_order(true),
        ))
    }

    fn process_group_by(
        &mut self,
        mut lf: LazyFrame,
        group_by_keys: &[Expr],
        projections: &[Expr],
    ) -> PolarsResult<LazyFrame> {
        let schema_before = self.get_frame_schema(&mut lf)?;
        let group_by_keys_schema =
            expressions_to_schema(group_by_keys, &schema_before, |duplicate_name: &str| {
                format!("group_by keys contained duplicate output name '{duplicate_name}'")
            })?;

        // Note: remove the `group_by` keys as Polars adds those implicitly.
        let mut aliased_aggregations: PlHashMap<PlSmallStr, PlSmallStr> = PlHashMap::new();
        let mut aggregation_projection = Vec::with_capacity(projections.len());
        let mut projection_overrides = PlHashMap::with_capacity(projections.len());
        let mut projection_aliases = PlHashSet::new();
        let mut group_key_aliases = PlHashSet::new();

        for mut e in projections {
            // `Len` represents COUNT(*) so we treat as an aggregation here.
            let is_non_group_key_expr = has_expr(e, |e| {
                match e {
                    Expr::Agg(_) | Expr::Len | Expr::Over { .. } => true,
                    #[cfg(feature = "dynamic_group_by")]
                    Expr::Rolling { .. } => true,
                    Expr::Function { function: func, .. }
                        if !matches!(func, FunctionExpr::StructExpr(_)) =>
                    {
                        // If it's a function call containing a column NOT in the group by keys,
                        // we treat it as an aggregation.
                        has_expr(e, |e| match e {
                            Expr::Column(name) => !group_by_keys_schema.contains(name),
                            _ => false,
                        })
                    },
                    _ => false,
                }
            });

            // Note: if simple aliased expression we defer aliasing until after the group_by.
            if let Expr::Alias(expr, alias) = e {
                if e.clone().meta().is_simple_projection(Some(&schema_before)) {
                    group_key_aliases.insert(alias.as_ref());
                    e = expr
                } else if let Expr::Function {
                    function: FunctionExpr::StructExpr(StructFunction::FieldByName(name)),
                    ..
                } = expr.deref()
                {
                    projection_overrides
                        .insert(alias.as_ref(), col(name.clone()).alias(alias.clone()));
                } else if !is_non_group_key_expr && !group_by_keys_schema.contains(alias) {
                    projection_aliases.insert(alias.as_ref());
                }
            }
            let field = e.to_field(&schema_before)?;
            if is_non_group_key_expr {
                let mut e = e.clone();
                if let Expr::Agg(AggExpr::Implode(expr)) = &e {
                    e = (**expr).clone();
                } else if let Expr::Alias(expr, name) = &e {
                    if let Expr::Agg(AggExpr::Implode(expr)) = expr.as_ref() {
                        e = (**expr).clone().alias(name.clone());
                    }
                }
                // If aggregation colname conflicts with a group key,
                // alias it to avoid duplicate/mis-tracked columns
                if group_by_keys_schema.get(&field.name).is_some() {
                    let alias_name = format!("__POLARS_AGG_{}", field.name);
                    e = e.alias(alias_name.as_str());
                    aliased_aggregations.insert(field.name.clone(), alias_name.as_str().into());
                }
                aggregation_projection.push(e);
            } else if let Expr::Column(_)
            | Expr::Function {
                function: FunctionExpr::StructExpr(StructFunction::FieldByName(_)),
                ..
            } = e
            {
                // Non-aggregated columns must be part of the GROUP BY clause
                if !group_by_keys_schema.contains(&field.name) {
                    polars_bail!(SQLSyntax: "'{}' should participate in the GROUP BY clause or an aggregate function", &field.name);
                }
            }
        }
        let aggregated = lf.group_by(group_by_keys).agg(&aggregation_projection);
        let projection_schema =
            expressions_to_schema(projections, &schema_before, |duplicate_name: &str| {
                format!("group_by aggregations contained duplicate output name '{duplicate_name}'")
            })?;

        // A final projection to get the proper order and any deferred transforms/aliases.
        let final_projection = projection_schema
            .iter_names()
            .zip(projections)
            .map(|(name, projection_expr)| {
                if let Some(expr) = projection_overrides.get(name.as_str()) {
                    expr.clone()
                } else if let Some(aliased_name) = aliased_aggregations.get(name) {
                    col(aliased_name.clone()).alias(name.clone())
                } else if group_by_keys_schema.get(name).is_some()
                    || projection_aliases.contains(name.as_str())
                    || group_key_aliases.contains(name.as_str())
                {
                    if has_expr(projection_expr, |e| {
                        matches!(e, Expr::Agg(_) | Expr::Len | Expr::Over { .. })
                    }) {
                        col(name.clone())
                    } else {
                        projection_expr.clone()
                    }
                } else {
                    col(name.clone())
                }
            })
            .collect::<Vec<_>>();

        Ok(aggregated.select(&final_projection))
    }

    fn process_limit_offset(
        &self,
        lf: LazyFrame,
        limit: &Option<SQLExpr>,
        offset: &Option<Offset>,
    ) -> PolarsResult<LazyFrame> {
        match (offset, limit) {
            (
                Some(Offset {
                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
                    ..
                }),
                Some(SQLExpr::Value(SQLValue::Number(limit, _))),
            ) => Ok(lf.slice(
                offset
                    .parse()
                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
                limit
                    .parse()
                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
            )),
            (
                Some(Offset {
                    value: SQLExpr::Value(SQLValue::Number(offset, _)),
                    ..
                }),
                None,
            ) => Ok(lf.slice(
                offset
                    .parse()
                    .map_err(|e| polars_err!(SQLInterface: "OFFSET conversion error: {}", e))?,
                IdxSize::MAX,
            )),
            (None, Some(SQLExpr::Value(SQLValue::Number(limit, _)))) => Ok(lf.limit(
                limit
                    .parse()
                    .map_err(|e| polars_err!(SQLInterface: "LIMIT conversion error: {}", e))?,
            )),
            (None, None) => Ok(lf),
            _ => polars_bail!(
                SQLSyntax: "non-numeric arguments for LIMIT/OFFSET are not supported",
            ),
        }
    }

    fn process_qualified_wildcard(
        &mut self,
        ObjectName(idents): &ObjectName,
        options: &WildcardAdditionalOptions,
        modifiers: &mut SelectModifiers,
        schema: Option<&Schema>,
    ) -> PolarsResult<Vec<Expr>> {
        let mut new_idents = idents.clone();
        new_idents.push(Ident::new("*"));

        let expr = resolve_compound_identifier(self, new_idents.deref(), schema);
        self.process_wildcard_additional_options(expr?, options, modifiers, schema)
    }

    fn process_wildcard_additional_options(
        &mut self,
        exprs: Vec<Expr>,
        options: &WildcardAdditionalOptions,
        modifiers: &mut SelectModifiers,
        schema: Option<&Schema>,
    ) -> PolarsResult<Vec<Expr>> {
        if options.opt_except.is_some() && options.opt_exclude.is_some() {
            polars_bail!(SQLInterface: "EXCLUDE and EXCEPT wildcard options cannot be used together (prefer EXCLUDE)")
        } else if options.opt_exclude.is_some() && options.opt_ilike.is_some() {
            polars_bail!(SQLInterface: "EXCLUDE and ILIKE wildcard options cannot be used together")
        }

        // SELECT * EXCLUDE
        if let Some(items) = &options.opt_exclude {
            match items {
                ExcludeSelectItem::Single(ident) => {
                    modifiers.exclude.insert(ident.value.clone());
                },
                ExcludeSelectItem::Multiple(idents) => {
                    modifiers
                        .exclude
                        .extend(idents.iter().map(|i| i.value.clone()));
                },
            };
        }

        // SELECT * EXCEPT
        if let Some(items) = &options.opt_except {
            modifiers.exclude.insert(items.first_element.value.clone());
            modifiers
                .exclude
                .extend(items.additional_elements.iter().map(|i| i.value.clone()));
        }

        // SELECT * ILIKE
        if let Some(item) = &options.opt_ilike {
            let rx = regex::escape(item.pattern.as_str())
                .replace('%', ".*")
                .replace('_', ".");

            modifiers.ilike = Some(
                polars_utils::regex_cache::compile_regex(format!("^(?is){rx}$").as_str()).unwrap(),
            );
        }

        // SELECT * RENAME
        if let Some(items) = &options.opt_rename {
            let renames = match items {
                RenameSelectItem::Single(rename) => std::slice::from_ref(rename),
                RenameSelectItem::Multiple(renames) => renames.as_slice(),
            };
            for rn in renames {
                let before = PlSmallStr::from_str(rn.ident.value.as_str());
                let after = PlSmallStr::from_str(rn.alias.value.as_str());
                if before != after {
                    modifiers.rename.insert(before, after);
                }
            }
        }

        // SELECT * REPLACE
        if let Some(replacements) = &options.opt_replace {
            for rp in &replacements.items {
                let replacement_expr = parse_sql_expr(&rp.expr, self, schema);
                modifiers
                    .replace
                    .push(replacement_expr?.alias(rp.column_name.value.as_str()));
            }
        }
        Ok(exprs)
    }

    fn rename_columns_from_table_alias(
        &mut self,
        mut lf: LazyFrame,
        alias: &TableAlias,
    ) -> PolarsResult<LazyFrame> {
        if alias.columns.is_empty() {
            Ok(lf)
        } else {
            let schema = self.get_frame_schema(&mut lf)?;
            if alias.columns.len() != schema.len() {
                polars_bail!(
                    SQLSyntax: "number of columns ({}) in alias '{}' does not match the number of columns in the table/query ({})",
                    alias.columns.len(), alias.name.value, schema.len()
                )
            } else {
                let existing_columns: Vec<_> = schema.iter_names().collect();
                let new_columns: Vec<_> =
                    alias.columns.iter().map(|c| c.name.value.clone()).collect();
                Ok(lf.rename(existing_columns, new_columns, true))
            }
        }
    }
}

impl SQLContext {
    /// Get internal table map. For internal use only.
    pub fn get_table_map(&self) -> PlHashMap<String, LazyFrame> {
        self.table_map.clone()
    }

    /// Create a new SQLContext from a table map. For internal use only
    pub fn new_from_table_map(table_map: PlHashMap<String, LazyFrame>) -> Self {
        Self {
            table_map,
            ..Default::default()
        }
    }
}

fn expand_exprs(expr: Expr, schema: &SchemaRef) -> Vec<Expr> {
    match expr {
        Expr::Column(nm) if is_regex_colname(nm.as_str()) => {
            let re = polars_utils::regex_cache::compile_regex(&nm).unwrap();
            schema
                .iter_names()
                .filter(|name| re.is_match(name))
                .map(|name| col(name.clone()))
                .collect::<Vec<_>>()
        },
        Expr::Selector(s) => s
            .into_columns(schema, &Default::default())
            .unwrap()
            .into_iter()
            .map(col)
            .collect::<Vec<_>>(),
        _ => vec![expr],
    }
}

fn is_regex_colname(nm: &str) -> bool {
    nm.starts_with('^') && nm.ends_with('$')
}

/// Visitor that checks if an expression tree contains a reference to a specific table.
struct FindTableIdentifier<'a> {
    table_name: &'a str,
    found: bool,
}

impl<'a> Visitor for FindTableIdentifier<'a> {
    type Break = ();

    fn pre_visit_expr(&mut self, expr: &SQLExpr) -> ControlFlow<Self::Break> {
        if let SQLExpr::CompoundIdentifier(idents) = expr {
            if idents.len() >= 2 && idents[0].value.as_str() == self.table_name {
                self.found = true; // return immediately on first match
                return ControlFlow::Break(());
            }
        }
        ControlFlow::Continue(())
    }
}

/// Check if all columns referred to in a Polars expression exist in the given Schema.
fn expr_cols_all_in_schema(expr: &Expr, schema: &Schema) -> bool {
    let mut found_cols = false;
    let mut all_in_schema = true;
    for e in expr.into_iter() {
        if let Expr::Column(name) = e {
            found_cols = true;
            if !schema.contains(name.as_str()) {
                all_in_schema = false;
                break;
            }
        }
    }
    found_cols && all_in_schema
}

/// Check if a SQL expression contains a reference to a specific table.
fn expr_refers_to_table(expr: &SQLExpr, table_name: &str) -> bool {
    let mut table_finder = FindTableIdentifier {
        table_name,
        found: false,
    };
    let _ = expr.visit(&mut table_finder);
    table_finder.found
}

/// Determine which parsed join expressions actually belong in `left_om` and which in `right_on`.
///
/// This needs to be handled carefully because in SQL joins you can write "join on" constraints
/// either way round, and in joins with more than two tables you can also join against an earlier
/// table (e.g.: you could be joining `df1` to `df2` to `df3`, but the final join condition where
/// we join  `df2` to `df3` could refer to `df1.a = df3.b`; this takes a little more work to
/// resolve as our native `join` function operates on only two tables at a time.
fn determine_left_right_join_on(
    ctx: &mut SQLContext,
    expr_left: &SQLExpr,
    expr_right: &SQLExpr,
    tbl_left: &TableInfo,
    tbl_right: &TableInfo,
    join_schema: &Schema,
) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
    // parse, removing any aliases that may have been added by `resolve_column`
    // (called inside `parse_sql_expr`) as we need the actual/underlying col
    let left_on = match parse_sql_expr(expr_left, ctx, Some(join_schema))? {
        Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
        e => e,
    };
    let right_on = match parse_sql_expr(expr_right, ctx, Some(join_schema))? {
        Expr::Alias(inner, _) => Arc::unwrap_or_clone(inner),
        e => e,
    };

    // ------------------------------------------------------------------
    // simple/typical case: can fully resolve SQL-level table references
    // ------------------------------------------------------------------
    let left_refs = (
        expr_refers_to_table(expr_left, &tbl_left.name),
        expr_refers_to_table(expr_left, &tbl_right.name),
    );
    let right_refs = (
        expr_refers_to_table(expr_right, &tbl_left.name),
        expr_refers_to_table(expr_right, &tbl_right.name),
    );
    // if the SQL-level references unambiguously indicate table ownership, we're done
    match (left_refs, right_refs) {
        // standard: left expr → left table, right expr → right table
        ((true, false), (false, true)) => return Ok((vec![left_on], vec![right_on])),
        // reversed: left expr → right table, right expr → left table
        ((false, true), (true, false)) => return Ok((vec![right_on], vec![left_on])),
        // unsupported: one side references *both* tables
        ((true, true), _) | (_, (true, true)) if tbl_left.name != tbl_right.name => {
            polars_bail!(
               SQLInterface: "unsupported join condition: {} side references both '{}' and '{}'",
               if left_refs.0 && left_refs.1 {
                    "left"
                } else {
                    "right"
                }, tbl_left.name, tbl_right.name
            )
        },
        // fall through to the more involved col/ref resolution
        _ => {},
    }

    // ------------------------------------------------------------------
    // more involved: additionally employ schema-based column resolution
    // (applies to unqualified columns and/or chained joins)
    // ------------------------------------------------------------------
    let left_on_cols_in = (
        expr_cols_all_in_schema(&left_on, &tbl_left.schema),
        expr_cols_all_in_schema(&left_on, &tbl_right.schema),
    );
    let right_on_cols_in = (
        expr_cols_all_in_schema(&right_on, &tbl_left.schema),
        expr_cols_all_in_schema(&right_on, &tbl_right.schema),
    );
    match (left_on_cols_in, right_on_cols_in) {
        // each expression's columns exist in exactly one schema
        ((true, false), (false, true)) => Ok((vec![left_on], vec![right_on])),
        ((false, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
        // one expression in both, other only in one; prefer the unique one
        ((true, true), (true, false)) => Ok((vec![right_on], vec![left_on])),
        ((true, true), (false, true)) => Ok((vec![left_on], vec![right_on])),
        ((true, false), (true, true)) => Ok((vec![left_on], vec![right_on])),
        ((false, true), (true, true)) => Ok((vec![right_on], vec![left_on])),
        // pass through as-is
        _ => Ok((vec![left_on], vec![right_on])),
    }
}

fn process_join_on(
    ctx: &mut SQLContext,
    sql_expr: &SQLExpr,
    tbl_left: &TableInfo,
    tbl_right: &TableInfo,
) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
    match sql_expr {
        SQLExpr::BinaryOp { left, op, right } => match op {
            BinaryOperator::And => {
                let (mut left_i, mut right_i) = process_join_on(ctx, left, tbl_left, tbl_right)?;
                let (mut left_j, mut right_j) = process_join_on(ctx, right, tbl_left, tbl_right)?;
                left_i.append(&mut left_j);
                right_i.append(&mut right_j);
                Ok((left_i, right_i))
            },
            BinaryOperator::Eq => {
                // establish unified schema with cols from both tables; needed for multi/chained
                // joins where suffixed intermediary/joined cols aren't in an existing schema.
                let mut join_schema =
                    Schema::with_capacity(tbl_left.schema.len() + tbl_right.schema.len());
                for (name, dtype) in tbl_left.schema.iter() {
                    join_schema.insert_at_index(join_schema.len(), name.clone(), dtype.clone())?;
                }
                for (name, dtype) in tbl_right.schema.iter() {
                    if !join_schema.contains(name) {
                        join_schema.insert_at_index(
                            join_schema.len(),
                            name.clone(),
                            dtype.clone(),
                        )?;
                    }
                }
                determine_left_right_join_on(ctx, left, right, tbl_left, tbl_right, &join_schema)
            },
            _ => polars_bail!(
                // TODO: should be able to support more operators later (via `join_where`?)
                SQLInterface: "only equi-join constraints (combined with 'AND') are currently supported; found op = '{:?}'", op
            ),
        },
        SQLExpr::Nested(expr) => process_join_on(ctx, expr, tbl_left, tbl_right),
        _ => polars_bail!(
            SQLInterface: "only equi-join constraints are currently supported; found expression = {:?}", sql_expr
        ),
    }
}

fn process_join_constraint(
    constraint: &JoinConstraint,
    tbl_left: &TableInfo,
    tbl_right: &TableInfo,
    ctx: &mut SQLContext,
) -> PolarsResult<(Vec<Expr>, Vec<Expr>)> {
    match constraint {
        JoinConstraint::On(expr @ SQLExpr::BinaryOp { .. }) => {
            process_join_on(ctx, expr, tbl_left, tbl_right)
        },
        JoinConstraint::Using(idents) if !idents.is_empty() => {
            let using: Vec<Expr> = idents.iter().map(|id| col(id.value.as_str())).collect();
            Ok((using.clone(), using))
        },
        JoinConstraint::Natural => {
            let left_names = tbl_left.schema.iter_names().collect::<PlHashSet<_>>();
            let right_names = tbl_right.schema.iter_names().collect::<PlHashSet<_>>();
            let on: Vec<Expr> = left_names
                .intersection(&right_names)
                .map(|&name| col(name.clone()))
                .collect();
            if on.is_empty() {
                polars_bail!(SQLInterface: "no common columns found for NATURAL JOIN")
            }
            Ok((on.clone(), on))
        },
        _ => polars_bail!(SQLInterface: "unsupported SQL join constraint:\n{:?}", constraint),
    }
}

bitflags::bitflags! {
    /// Bitfield indicating whether there exists a projection with the specified height behavior.
    ///
    /// Used to help determine whether to execute projections in `select()` or `with_columns()`
    /// context.
    #[derive(PartialEq)]
    struct ExprSqlProjectionHeightBehavior: u8 {
        /// Maintains the height of input column(s)
        const MaintainsColumn = 1 << 0;
        /// Height is independent of input, e.g.:
        /// * expressions that change length: e.g. slice, explode, filter, gather etc.
        /// * aggregations: count(*), first(), sum() etc.
        const Independent = 1 << 1;
        /// "Inherits" the height of the context, e.g.:
        /// * Scalar literals
        const InheritsContext = 1 << 2;
    }
}

impl ExprSqlProjectionHeightBehavior {
    fn identify_from_expr(expr: &Expr) -> Self {
        let mut has_column = false;
        let mut has_independent = false;

        for e in expr.into_iter() {
            use Expr::*;
            has_column |= matches!(e, Column(_) | Selector(_));
            has_independent |= match e {
                // @TODO: This is broken now with functions.
                AnonymousFunction { options, .. } => {
                    options.returns_scalar() || !options.is_length_preserving()
                },
                Literal(v) => !v.is_scalar(),
                Explode { .. } | Filter { .. } | Gather { .. } | Slice { .. } => true,
                Agg { .. } | Len => true,
                _ => false,
            }
        }
        if has_independent {
            Self::Independent
        } else if has_column {
            Self::MaintainsColumn
        } else {
            Self::InheritsContext
        }
    }
}
